package com.doit.demo.day06;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @DATE 2022/2/20/14:50
 * @Author MDK
 * @Version 2021.2.2
 *
 * Flink的State分为两种:KeyedState(KeyBy之后对应的State)和OperatorState(没有keyBy的State)
 *
 *  ValueState的底层实现问题
 *   1.KeyedState底层是一个Map结构
 *   2.如果想要容错就必须开启checkPointing机制,并且按照Flink的状态API进行编程(将中间结果都保存在Flink的特殊变量中)
 **/
public class ValueStateDemo3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //开启checkPoint
        env.enableCheckpointing(5000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,5000));

        DataStreamSource<String> lines = env.socketTextStream("linux01", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                if (line.startsWith("error")) {
                    throw new RuntimeException("有错误数据出现,抛异常!!!");
                }
                String[] words = line.split(",");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndOne.keyBy(t -> t.f0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> res = keyedStream.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            private ValueState<Integer> valueState;

            //在open方法中初始化状态或恢复状态
            @Override
            public void open(Configuration parameters) throws Exception {
                //定义状态描述器(描述状态的类型、名称)
                ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);
                //初始化或恢复状态
                valueState = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                Integer current = input.f1;
                //内部会获取当前的key，根据当前的key取出对应的value
                Integer history = valueState.value();
                if (history == null) {
                    history = 0;
                }
                current += history;
                //更新状态
                valueState.update(current);
                //输出数据
                input.f1 = current;
                return input;
            }
        });

        res.print();
        env.execute();

    }
}
